{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Sending data to Spark cluster from Local instance\n",
    "\n",
    "suppose you would like to send pretrained scikit-learn model to your Spark cluster(e.g. for further usage with other packages like `spark-sklearn`)\n",
    "\n",
    "**warning** this example assumes that both (py)Spark cluster and your local machine both have the same python packages versions\n",
    "\n",
    "the following code requires numpy, scipy and scikit-learn to be installed"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can send strings:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "abc ሴ def\n"
     ]
    }
   ],
   "source": [
    "%%local\n",
    "\n",
    "s = u\"abc ሴ def\"\n",
    "print(s)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Starting Spark application\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<table>\n",
       "<tr><th>ID</th><th>YARN Application ID</th><th>Kind</th><th>State</th><th>Spark UI</th><th>Driver log</th><th>Current session?</th></tr><tr><td>16</td><td>None</td><td>pyspark</td><td>idle</td><td></td><td></td><td>✔</td></tr></table>"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "SparkSession available as 'spark'.\n"
     ]
    },
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Successfully passed 's' as 's' to Spark kernel"
     ]
    }
   ],
   "source": [
    "%%send_to_spark -i s -t str -n s"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "abc ? def"
     ]
    }
   ],
   "source": [
    "# This runs in Spark; not sure why Unicode doesn't work\n",
    "print(s)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next we train a model to get its precision:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "0.9763157894736842"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "%%local\n",
    "from sklearn.model_selection import train_test_split\n",
    "from sklearn.datasets import load_iris\n",
    "import pandas as pd\n",
    "import pickle\n",
    "from sklearn import tree\n",
    "from sklearn.metrics import precision_score\n",
    "\n",
    "iris = load_iris()\n",
    "X_train, X_test, y_train, y_test = train_test_split(iris.data, iris.target, random_state=1)\n",
    "X_test_pd = pd.DataFrame(X_test, columns=['a','b','c','d'])\n",
    "Y_test_pd = pd.DataFrame(y_test, columns=['pred'])\n",
    "\n",
    "decision_tree = tree.DecisionTreeClassifier()\n",
    "decision_tree_model = decision_tree.fit(X_train, y_train)\n",
    "\n",
    "y_pred = decision_tree_model.predict(X_test)\n",
    "precision_score(y_test, y_pred, average='weighted')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "send test sets to `%spark`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Successfully passed 'X_test_pd' as 'X_test' to Spark kernel"
     ]
    }
   ],
   "source": [
    "%%send_to_spark -i X_test_pd -t df -n X_test"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Successfully passed 'Y_test_pd' as 'y_test' to Spark kernel"
     ]
    }
   ],
   "source": [
    "%%send_to_spark -i Y_test_pd -t df -n y_test"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "because `pickle.dumps` returns `bytearray` we encode it to base64"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%local\n",
    "import codecs\n",
    "decision_tree_pickled = codecs.encode(pickle.dumps(decision_tree_model), \"base64\").decode()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Successfully passed 'decision_tree_pickled' as 'decision_tree_pickled' to Spark kernel"
     ]
    }
   ],
   "source": [
    "%%send_to_spark -i decision_tree_pickled -t str -n decision_tree_pickled"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "decode from base64 in `%spark`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "import pickle, codecs\n",
    "\n",
    "decision_tree_model = pickle.loads(codecs.decode(decision_tree_pickled.encode(), \"base64\"))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "convert Pyspark DataFrame into numpy arrays"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "import numpy as np\n",
    "y_test = np.array(y_test.select('pred').collect())\n",
    "X_test = np.array(X_test.collect())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "run pretrained classifier and see if its precision matches the `%local` model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "application/vnd.jupyter.widget-view+json": {
       "model_id": "",
       "version_major": 2,
       "version_minor": 0
      },
      "text/plain": [
       "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "0.9763157894736842"
     ]
    }
   ],
   "source": [
    "from sklearn.metrics import precision_score\n",
    "\n",
    "y_pred = decision_tree_model.predict(X_test)\n",
    "precision_score(y_test, y_pred, average='weighted')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "it does! we have successfully passed both string and pandas dataframe from `%local` to `%spark`"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark",
   "language": "",
   "name": "pysparkkernel"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "python",
    "version": 3
   },
   "mimetype": "text/x-python",
   "name": "pyspark",
   "pygments_lexer": "python3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
